package com.tca.common.learning.hadoop.mapreduce.cp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author zhoua
 * @date 2023/11/5 13:03
 * CP: comparable & partitioner
 */
public class CpDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1.获取配置信息以及获取job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2.关联当前Driver的jar
        job.setJarByClass(CpDriver .class);

        // 3.关联Mapper和Reducer的jar
        job.setMapperClass(CpMapper .class);
        job.setReducerClass(CpReducer .class);

        // 4.设置Mapper输出的kv
        job.setMapOutputKeyClass(CpFlowBean .class);
        job.setMapOutputValueClass(Text .class);

        // 5.设置Reducer 最终输出kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CpFlowBean.class);

        // 设置Partitioner和NumberReduceTask
        // numberReduceTask一般情况下要等于分区的个数
        // 如果 1 < numberReduceTask < 分区个数, 报错
        // 如果 numberReduceTask > 分区个数, 则会生成部分空文件
        // 如果 numberReduceTask = 1, 则不会走分区
        job.setPartitionerClass(CpPartitioner .class);
        job.setNumReduceTasks(3);

        // 6.设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result? 0: 1);
    }


}
